/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 */

package org.apache.hadoop.hive.llap.daemon.impl;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.ListIterator;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GetTokenRequestProto;
import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
import org.apache.hadoop.hive.llap.security.SecretManager;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * DefaultLlapTokenManager is for renewing and recreating LLAP tokens on a cluster.
 *
 * This class runs in the LLAP Daemon, but the whole the process involves two components:
 * AM(s): LlapTaskCommunicator -- LlapProtocolClientProxy
 * Daemon(s): LlapTokenManager
 *
 * DefaultLlapTokenManager tracks llap token instances and handles them as:
 * a) renews periodically when they get closer to expiry date
 * b) recreates when they get close to max lifetime.
 * This logic is needed because Hadoop does not have a unified way for all applications to do this,
 * hence the LLAP_TOKEN specific implementation can be found in this class.
 * The tokens are persisted by a zookeeper-based secretmanager (ZKDelegationTokenSecretManager), and the typical
 * use-case can be like:
 * 1. AM calls Daemons to obtain a token (LlapProtocolClientProxy), using an existing token, or by kerberos login
 * 2. Daemon generates (if needed) and handles token which is then in its scope, and returns the token on request
 * 3. on shutdown: Daemon cancels all tokens which were made by it
 * 4. AM re-requests for a token from existing Daemons in case of communication failure (as the token is cancelled)
 *
 * This approach doesn't need a centralized component to handle tokens, as all deamons handle
 * renewal/recreation of their tokens. The only known drawback is 4) above: if a Daemon stops/fails/crashes, AM should
 * obtain a new token if it uses a token that was generated by that particular Daemon.
 */
public class DefaultLlapTokenManager implements LlapTokenManager {
  private static final Logger LOG = LoggerFactory.getLogger(DefaultLlapTokenManager.class);

  private final ScheduledExecutorService tokenChecker = Executors.newScheduledThreadPool(1);
  private final List<TokenWrapper> tokens = new ArrayList<>();
  private SecretManager secretManager;
  private String clusterUser;

  /**
   * Convenience wrapper for llap tokens, that decodes max lifetime only once on adding and maintains expiration time.
   */
  private class TokenWrapper {

    private final Token<LlapTokenIdentifier> realToken;
    private final long issueDate;
    private final long maxDate;
    private long expirationTime = 0;
    private long renewalTime = 0;

    public TokenWrapper(Token<LlapTokenIdentifier> token) {
      this.realToken = token;
      try {
        renew(); // renew immediately in order to become aware of the expiration date
        issueDate = token.decodeIdentifier().getIssueDate();
        maxDate = token.decodeIdentifier().getMaxDate();
      } catch (IOException e) {
        throw new RuntimeException(e);
      }
    }

    private void renew() {
      LOG.info("Renewing token: " + realToken);
      try {
        expirationTime = secretManager.renewToken(realToken, clusterUser);
        renewalTime = Time.now();
        LOG.info("Renewed token: " + realToken);
      } catch (IOException e) {
        throw new RuntimeException(e);
      }
    }

    public String toString() {
      return realToken.toString();
    }
  }

  public DefaultLlapTokenManager(Configuration conf, SecretManager secretManager) {
    this.secretManager = secretManager;
    try {
      this.clusterUser = UserGroupInformation.getCurrentUser().getShortUserName();
    } catch (IOException e) {
      throw new RuntimeException(e);
    }

    LOG.info("Initializing periodic token refresh in daemon, will run in every {}s",
        LLAP_TOKEN_CHECK_INTERVAL_IN_DAEMON_SECONDS);
    tokenChecker.scheduleAtFixedRate(() -> {
      LOG.debug("Checking tokens, count: {}", tokens.size());
      ListIterator<TokenWrapper> tokensIt = tokens.listIterator();
      while (tokensIt.hasNext()) {
        TokenWrapper token = tokensIt.next();
        if (needsRecreate(token)) {
          /* If the token needs to be recreated, it's because maxDate is close. We can remove it here,
           * so it won't be returned by this daemon anymore. The cancelToken represents the same in the ZK-based
           * secretManager (removes the token), so additional cleanup is not needed.
           */
          try {
            LOG.info("Cancelling token: {}", token.realToken);
            secretManager.cancelToken(token.realToken, clusterUser);
            tokensIt.remove();
          } catch (IOException e) {
            LOG.error("Error while cancelling token: {}", token.realToken, e);
          }
        } else if (needsRenew(token)) {
          token.renew();
        }
      }
    }, 0, LLAP_TOKEN_CHECK_INTERVAL_IN_DAEMON_SECONDS, TimeUnit.SECONDS);
  }

  private boolean needsRecreate(TokenWrapper token) {
    long now = Time.now();
    long tokenWholeLifeTimeMs = token.maxDate - token.issueDate;
    long tokenRemainingLifeTimeMs = Math.max(token.maxDate - now, 0);
    // a number which tells how close if maxDate, e.g. 0.1 means only 10% of the token lifetime left
    double tokenRemainingLifeTimePercent = (double) tokenRemainingLifeTimeMs / tokenWholeLifeTimeMs;
    boolean needsRecreate = tokenRemainingLifeTimePercent < 0.1;

    LOG.debug(
        "Token needsRecreate? {}, now: {}, maxDate: {}, issueDate: {}, tokenWholeLifeTime(ms): {}, "
            + "tokenRemainingLifeTime(ms): {}, tokenRemainingLifeTimePercent: {}%",
        needsRecreate, now, token.maxDate, token.issueDate, tokenWholeLifeTimeMs, tokenRemainingLifeTimeMs,
        toPercentString(tokenRemainingLifeTimePercent));
    return needsRecreate;
  }

  private boolean needsRenew(TokenWrapper token) {
    long now = Time.now();
    long tokenWholeValidityPeriodMs = token.expirationTime - token.renewalTime;
    long tokenRemainingValidityPeriodMs = Math.max(token.expirationTime - now, 0);
    // a number which tells how close is token expiry time, e.g. 0.1 means only 10% of the token validity period left
    double tokenRemainingValidityPeriodPercent = (double) tokenRemainingValidityPeriodMs / tokenWholeValidityPeriodMs;
    boolean needsRenew = tokenRemainingValidityPeriodPercent < 0.1;

    LOG.debug(
        "Token needsRenew? {}, now: {}, expirationTime: {}, renewalTime: {}, tokenWholeValidityPeriod(ms): {},"
            + " tokenRemainingValidityPeriod(ms): {}, tokenRelativeRemainingValidityPeriod: {}%",
        needsRenew, now, token.expirationTime, token.renewalTime, tokenWholeValidityPeriodMs,
        tokenRemainingValidityPeriodMs, toPercentString(tokenRemainingValidityPeriodPercent));
    return needsRenew;
  }

  private String toPercentString(double dblNumber) {
    return Double.toString((double)Math.round(dblNumber * 1000) / 10);
  }

  @Override
  public Token<LlapTokenIdentifier> getToken(GetTokenRequestProto request, boolean isSigningRequired)
      throws IOException {
    Token<LlapTokenIdentifier> token = tokens.isEmpty() ? null : tokens.get(0).realToken;

    if (token == null) {
      token = generateToken(request, isSigningRequired);
    } else {
      LOG.debug("Returning already existing token: {}", token);
    }

    return token;
  }

  private Token<LlapTokenIdentifier> generateToken(GetTokenRequestProto request, boolean isSigningRequired)
      throws IOException {
    Token<LlapTokenIdentifier> token =
        secretManager.createLlapToken(request.hasAppId() ? request.getAppId() : null, null, isSigningRequired);
    tokens.add(new TokenWrapper(token));

    LOG.info("Added new token: {}, #tokens: {}", token, tokens.size());
    return token;
  }

  @Override
  public void close() {
    cancelTokens(tokens);
  }

  private void cancelTokens(List<TokenWrapper> tokensToCancel) {
    ListIterator<TokenWrapper> tokensIt = tokensToCancel.listIterator();
    while (tokensIt.hasNext()) {
      TokenWrapper token = tokensIt.next();
      try {
        secretManager.cancelToken(token.realToken, clusterUser);
        tokensIt.remove();
      } catch (IOException e) {
        LOG.warn("Cannot cancel token while shutting down (on IOException): " + token + ", giving up", e);
      }
    }
  }
}
